package com.example.spider.cluster.handler;

import com.example.spider.cluster.common.dto.StandMqMessage;
import com.example.spider.cluster.common.dto.SpiderResultDTO;
import com.example.spider.cluster.common.stream.HandlerChannel;
import com.example.spider.cluster.common.stream.SpiderQueueDefinition;
import com.example.spider.common.handler.SpiderResultHandlerManager;
import com.example.spider.common.task.SpiderTask;
import com.example.spider.common.task.SpiderTaskSender;
import com.example.spider.common.util.JsonUtils;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

/**
 * 发送爬虫任务
 * @author lym
 */
@Slf4j
@Component
@EnableBinding(HandlerChannel.class)
public class CloudStreamSpiderTaskSender implements SpiderTaskSender {

    @Autowired
    private HandlerChannel handlerChannel;

    @Autowired
    private SpiderResultHandlerManager resultHandlerManager;

    @StreamListener(SpiderQueueDefinition.SPIDER_RESULT_IN)
    public void input(Message<StandMqMessage> resultMsg, @Header(AmqpHeaders.CHANNEL) Channel channel,
                      @Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag) {
        try {
            log.debug("received SpiderResultDTO");
            SpiderResultDTO result = SpiderResultDTO.fromMq(resultMsg.getPayload());
            resultHandlerManager.handle(result.getSpiderTask(), result.getResult());
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            log.error("error when received SpiderTask", e);
        }
    }

    @Override
    public void send(SpiderTask spiderTask) {
        StandMqMessage standMqMessage = new StandMqMessage();
        standMqMessage.setType(spiderTask.getType());
        standMqMessage.setData(JsonUtils.toJson(spiderTask));
        handlerChannel.putTasl().send(MessageBuilder.withPayload(standMqMessage).build());
    }

}
